/*
 * Copyright 2021-2022 Open Kunlun Technology <https://www.openkunlun.io>
 */

package io.openkunlun.scaladsl.client

import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.event.Logging
import io.openkunlun.scaladsl.context.AkkaGrpcContext
import io.openkunlun.scaladsl.model.State
import io.openkunlun.scaladsl.serialization.{ DaprObjectSerialization, JsonSerialization }
import io.openkunlun.scaladsl.util.Strings

import scala.concurrent.{ ExecutionContext, Future }

/**
 * @author ericxin.
 */
object DaprClient extends ExtensionId[DaprClient] with ExtensionIdProvider {
  override def lookup: ExtensionId[DaprClient] = DaprClient
  override def createExtension(system: ExtendedActorSystem) = new DaprClient(system)
}
class DaprClient(system: ExtendedActorSystem) extends Extension {

  private val log = Logging(system, getClass)

  private val serialization: DaprObjectSerialization = JsonSerialization(system)
  private val stateClient: DaprStateClient = DaprStateClient(system)
  private val bindingClient: DaprBindingClient = DaprBindingClient(system)
  private val metadataClient: DaprMetadataClient = DaprMetadataClient(system)
  private val invocationClient: DaprInvocationClient = DaprInvocationClient(system)

  private val contextStorage = AkkaGrpcContext(system)

  def invokeMethod[T: Manifest](action: InvokeMethodAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[T] = {
    val allHeaders = prepareHeaders(headers)
    log.info("Invoke method[{}: {}] width headers: {}", action.id, action.method, Strings.toString(allHeaders))
    invocationClient.invokeMethod(action, allHeaders)
  }

  def invokeBinding[T: Manifest](action: InvokeBindingAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[T] = {
    bindingClient.invokeBinding(action, headers)
  }

  def getState[T: Manifest](action: GetStateAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[State[T]] = {
    stateClient.getState(action, headers)
  }

  def getBulkState[T: Manifest](action: GetBulkStateAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[Seq[State[T]]] = {
    stateClient.getBulkState(action, headers)
  }

  def deleteState[T: Manifest](action: DeleteStateAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[Unit] = {
    stateClient.deleteState(action, headers)
  }

  def setState[T: Manifest](action: SetStateAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[Unit] = {
    stateClient.setState(action, headers)
  }

  def executeState[T: Manifest](action: ExecuteStateAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[Unit] = {
    stateClient.executeState(action, headers)
  }

  private def prepareHeaders(headers: Map[String, String]): Map[String, String] = {
    (headers ++ contextStorage.tags()
      .map(it => it.key -> it.value)
      .filter(it => Strings.nonEmpty(it._2))
      .toMap)
      //io.grpc.Metadata - Metadata key is 'Connection', which should not be used. That is used by HTTP/1 for connection-specific headers which are not to be forwarded.
      // There is probably an HTTP/1 conversion bug. Simply removing the Connection header is not enough;
      // you should remove all headers it references as well. See RFC 7230 section 6.1
      .filterNot(it => it._1.equalsIgnoreCase("Connection"))
  }
}
